package cluster

import (
	"encoding/json"
	"fmt"
	"reflect"
	"strconv"
	"sync"

	"github.com/coreos/etcd/mvcc/mvccpb"

	"github.com/AsynkronIT/protoactor-go/actor"
	"github.com/coreos/etcd/clientv3"

	nactor "nggs/actor"
	ndebug "nggs/debug"
	netcd "nggs/etcd"
	netcdv3 "nggs/etcd/v3"
	nexport "nggs/export"
	nlog "nggs/log"
	nservice "nggs/service"
)

const (
	etcdLeaseTimeoutSecond = 5
)

var C = NewCluster(nil, netcdv3.C)

type WatchCallback func(err error, section string, serviceName string, serviceID int, ev *clientv3.Event)

type Cluster struct {
	*nactor.Actor

	startedWg sync.WaitGroup
	stoppedWg sync.WaitGroup

	etcdConfig netcd.Config
	etcdClient *netcdv3.Client
	leaseID    clientv3.LeaseID
	keepLiveCh <-chan *clientv3.LeaseKeepAliveResponse

	globalConfigReflectType reflect.Type
	globalConfig            nexport.IClusterGlobalConfig

	serviceGroups map[string]*nservice.Group // serviceName -> serviceGroup

	watchCallbacks []WatchCallback
}

func NewCluster(logger nlog.ILogger, etcdClient *netcdv3.Client) (c *Cluster) {
	c = &Cluster{
		etcdClient:    etcdClient,
		serviceGroups: map[string]*nservice.Group{},
	}
	if c.etcdClient == nil {
		c.etcdClient = netcdv3.New()
	}
	c.Actor = nactor.New(
		nactor.WithLogger(logger),
		nactor.WithStartedWaitGroup(&c.startedWg),
		nactor.WithStoppedWaitGroup(&c.stoppedWg),
		nactor.WithOnReceiveMessage(c.onReceiveMessage),
		nactor.WithOnStopping(c.onStopping),
	)
	return
}

type ServiceGroupConfig struct {
	GlobalConfig nexport.IServiceGlobalConfig
	Config       nexport.IServiceConfig
}

type ServiceGroupConfigMap = map[string]ServiceGroupConfig

func (c *Cluster) Init(etcdConfigFilePath string, globalConfig nexport.IClusterGlobalConfig, serviceGroupConfigMap ServiceGroupConfigMap) (err error) {
	err = c.etcdConfig.LoadFromFile(etcdConfigFilePath)
	if err != nil {
		err = fmt.Errorf("load etcd config from file fail, file path=%s, %w", etcdConfigFilePath, err)
		return
	}

	err = c.etcdConfig.TidyAndCheck()
	if err != nil {
		err = fmt.Errorf("tidy and check etcd config fail, %w", err)
		return
	}

	err = c.etcdClient.Init(c.etcdConfig.Endpoint)
	if err != nil {
		err = fmt.Errorf("init etcd client fail, endpoint=%s, %w", c.etcdConfig.Endpoint, err)
		return
	}

	c.leaseID, err = c.etcdClient.GrantLease(etcdLeaseTimeoutSecond)
	if err != nil {
		err = fmt.Errorf("grant etcd lease fail, time out second=%d, %w", etcdLeaseTimeoutSecond, err)
		return err
	}

	c.keepLiveCh, err = c.etcdClient.KeepAliveLease(c.leaseID)
	if err != nil {
		err = fmt.Errorf("get etcd keep alive lease fail, %w", err)
		return err
	}

	if globalConfig != nil {
		c.globalConfigReflectType = reflect.TypeOf(globalConfig).Elem()
	}

	for serviceName, serviceGroupConfig := range serviceGroupConfigMap {
		if serviceName == "" {
			err = fmt.Errorf("service group name can not be empty")
			return
		}

		if _, ok := c.serviceGroups[serviceName]; ok {
			err = fmt.Errorf("service group already exist, name=[%s]", serviceName)
			return
		}

		if serviceGroupConfig.Config == nil {
			err = fmt.Errorf("service group config can not be nil, name=[%s]", serviceName)
			return
		}

		sg := nservice.NewGroup(serviceName)

		if serviceGroupConfig.GlobalConfig != nil {
			sg.GlobalConfigReflectType = reflect.TypeOf(serviceGroupConfig.GlobalConfig).Elem()
		}

		sg.ConfigReflectType = reflect.TypeOf(serviceGroupConfig.Config).Elem()

		c.serviceGroups[serviceName] = sg
	}

	err = c.loadAndCheckConfig()
	if err != nil {
		err = fmt.Errorf("load and check config fail, %w", err)
		return
	}

	err = c.Start(nil, "cluster")
	if err != nil {
		err = fmt.Errorf("start cluster fail, %w", err)
		return
	}
	c.WaitForStarted()

	// 开启监听etcd的协程
	rch := c.etcdClient.WatchWithPrefix(c.etcdConfig.Root)
	var wg sync.WaitGroup
	wg.Add(1)
	go netcdv3.WatchLoop(rch, func(ev *clientv3.Event) {
		nactor.RootContext.Send(c.PID(), ev)
	}, &wg)
	wg.Wait()

	return
}

func (c *Cluster) loadAndCheckConfig() (err error) {
	if c.globalConfigReflectType != nil {
		k := c.GetGlobalConfigKey()
		v, e := c.etcdClient.Get(k)
		if e != nil {
			err = fmt.Errorf("get cluster global config fail, key=[%s], %w", k, e)
			return
		}
		if len(v) > 0 {
			c.globalConfig = reflect.New(c.globalConfigReflectType).Interface().(nexport.IClusterGlobalConfig)
			if e := json.Unmarshal(v, c.globalConfig); e != nil {
				err = fmt.Errorf("unmarshal cluster global config fail, content=[%s], %w", string(v), e)
				return
			}
			if e := c.globalConfig.TidyAndCheck(); e != nil {
				err = fmt.Errorf("tidy and check cluster global config fail, %w", e)
				return
			}
		}
	}

	for _, sg := range c.serviceGroups {
		if sg.GlobalConfigReflectType != nil {
			k := c.GenConfigKeyS(sg.Name, nservice.GlobalConfigServiceID)
			v, e := c.etcdClient.Get(k)
			if e != nil {
				err = fmt.Errorf("get service[%s] global config fail, key=[%s], %w", sg.Name, k, e)
				return
			}

			if len(v) > 0 {
				sg.GlobalConfig = reflect.New(sg.GlobalConfigReflectType).Interface().(nexport.IServiceGlobalConfig)
				if e := json.Unmarshal(v, sg.GlobalConfig); e != nil {
					err = fmt.Errorf("unmarshal service[%s] global config fail, content=[%s], %w", sg.Name, string(v), e)
					return
				}
				if e := sg.GlobalConfig.TidyAndCheck(); e != nil {
					err = fmt.Errorf("tidy and check service[%s] global config fail, %w", sg.Name, e)
					return
				}
			}
		}

		configPrefix := c.GenConfigKeyPrefix(sg.Name)
		cfgs, e := c.etcdClient.GetStrings(configPrefix)
		if e != nil {
			err = fmt.Errorf("get service[%s] configs fail, config prefix=[%s], %w", sg.Name, configPrefix, e)
			return
		}

		var id int
		for k, v := range cfgs {
			_, e := fmt.Sscanf(k, configPrefix+":%d", &id)
			if e != nil {
				continue
			}

			if sg.GetConfigByID(id) != nil {
				err = fmt.Errorf("duplicate service[%s] config, id=%d", sg.Name, id)
				return
			}

			configData := []byte(v)

			iConfig := reflect.New(sg.ConfigReflectType).Interface()
			if e = json.Unmarshal(configData, iConfig); e != nil {
				err = fmt.Errorf("unmarshal service[%s] config fail, content=[%s], %w", sg.Name, v, e)
				return
			}

			s := nservice.NewInfo(iConfig.(nexport.IServiceConfig), id)
			if s.Config.GetID() != id {
				err = fmt.Errorf("service[%s] config id not match, [%d] != [%d]", sg.Name, s.Config.GetID(), id)
				return
			}

			if e := s.Config.TidyAndCheck(); e != nil {
				err = fmt.Errorf("tidy and check service[%s] config fail, id=[%d], %w", sg.Name, id, e)
				return
			}

			// 获取实例
			instance, e := c.directGetInstance(sg.Name, id)
			if e == nil && !instance.IsEmpty() {
				s.Instance = nservice.NewInstance()
				*s.Instance = instance

				payload, e := c.directGetPayload(sg.Name, id)
				if e != nil {
					// 能取到实例必然能取到负载
					err = fmt.Errorf("direct get service[%s] payload fail, id=[%d], %w", sg.Name, id, e)
					return
				}
				s.Payload = payload
			}

			sg.AddInfo(id, s)
		}

		c.serviceGroups[sg.Name] = sg
	}

	return
}

func (c *Cluster) getServiceGroup(serviceName string) *nservice.Group {
	sg, ok := c.serviceGroups[serviceName]
	if ok {
		return sg
	}
	return nil
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) GetGlobalConfigKey() string {
	return GenKey(c.EtcdConfig().Root, SectionConfig, "global")
}

func (c *Cluster) GenConfigKeyPrefix(serviceName string) string {
	return GenKey(c.EtcdConfig().Root, SectionConfig, serviceName)
}

func (c *Cluster) GenConfigKeyS(serviceName string, strServiceID string) string {
	return GenKey(c.GenConfigKeyPrefix(serviceName), strServiceID)
}

func (c *Cluster) GenConfigKey(serviceName string, serviceID int) string {
	return GenKey(c.GenConfigKeyPrefix(serviceName), serviceID)
}

func (c *Cluster) ParseConfigKeyS(key string) (serviceName string, strServiceID string, err error) {
	var section string
	_, section, serviceName, strServiceID, err = ParseKeyS(key)
	if err != nil {
		return
	}
	if section != SectionConfig {
		err = fmt.Errorf("not config")
		return
	}
	return
}

func (c *Cluster) ParseConfigKey(key string) (serviceName string, serviceID int, err error) {
	var strServiceID string
	serviceName, strServiceID, err = c.ParseConfigKeyS(key)
	if err != nil {
		return
	}
	if strServiceID != "" {
		serviceID, err = strconv.Atoi(strServiceID)
	}
	return
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) GenInstanceKeyPrefix(serviceName string) string {
	return GenKey(c.EtcdConfig().Root, SectionInstance, serviceName)
}

func (c *Cluster) GenInstanceKey(serviceName string, serviceID int) string {
	return GenKey(c.GenInstanceKeyPrefix(serviceName), serviceID)
}

func (c *Cluster) GenInstanceKeyS(serviceName string, strServiceID string) string {
	return GenKey(c.GenInstanceKeyPrefix(serviceName), strServiceID)
}

func (c *Cluster) ParseInstanceKey(key string) (serviceName string, serviceID int, err error) {
	var strServiceID string
	serviceName, strServiceID, err = c.ParseInstanceKeyS(key)
	if err != nil {
		return
	}
	serviceID, err = strconv.Atoi(strServiceID)
	return
}

func (c *Cluster) ParseInstanceKeyS(key string) (serviceName string, strServiceID string, err error) {
	var section string
	_, section, serviceName, strServiceID, err = ParseKeyS(key)
	if err != nil {
		return
	}
	if section != SectionInstance {
		err = fmt.Errorf("not instance")
		return
	}
	return
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) GenPayloadKeyPrefix(serviceName string) string {
	return GenKey(c.EtcdConfig().Root, SectionPayload, serviceName)
}

func (c *Cluster) GenPayloadKey(serviceName string, serviceID int) string {
	return GenKey(c.GenPayloadKeyPrefix(serviceName), serviceID)
}

func (c *Cluster) GenPayloadKeyS(serviceName string, strServiceID string) string {
	return GenKey(c.GenPayloadKeyPrefix(serviceName), strServiceID)
}

func (c *Cluster) ParsePayloadKey(key string) (serviceName string, serviceID int, err error) {
	var strServiceID string
	serviceName, strServiceID, err = c.ParsePayloadKeyS(key)
	if err != nil {
		return
	}
	serviceID, err = strconv.Atoi(strServiceID)
	return
}

func (c *Cluster) ParsePayloadKeyS(key string) (serviceName string, strServiceID string, err error) {
	var section string
	_, section, serviceName, strServiceID, err = ParseKeyS(key)
	if err != nil {
		return
	}
	if section != SectionPayload {
		err = fmt.Errorf("not payload")
		return
	}
	return
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) directGetInstance(serviceName string, serviceID int) (instance nservice.Instance, err error) {
	key := c.GenInstanceKey(serviceName, serviceID)
	value, err := c.etcdClient.Get(key)
	if err != nil {
		return
	}
	err = json.Unmarshal(value, &instance)
	if err != nil {
		return
	}
	return
}

func (c *Cluster) directPutInstance(serviceName string, serviceID int, instance nservice.Instance) error {
	key := c.GenInstanceKey(serviceName, serviceID)
	value, err := json.Marshal(instance)
	if err != nil {
		return err
	}
	err = c.etcdClient.PutWithLease(key, string(value), c.LeaseID())
	if err != nil {
		return err
	}
	return nil
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) directGetPayload(serviceName string, serviceID int) (payload int, err error) {
	key := c.GenPayloadKey(serviceName, serviceID)
	value, err := c.etcdClient.GetString(key)
	if err != nil {
		return
	}
	payload, err = strconv.Atoi(value)
	if err != nil {
		return
	}
	return
}

func (c *Cluster) directPutPayload(serviceName string, serviceID int, payload int) (err error) {
	key := c.GenPayloadKey(serviceName, serviceID)
	value := strconv.Itoa(payload)
	err = c.etcdClient.PutWithLease(key, string(value), c.LeaseID())
	if err != nil {
		return
	}
	return
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) Close() {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) onReceiveMessage(ctx actor.Context) {
	defer func() {
		if r := recover(); r != nil {
			c.Error("%v\n%s", r, ndebug.Stack())
			panic(r)
		}
	}()

	sender := ctx.Sender()
	switch recv := ctx.Message().(type) {
	case *clientv3.Event:
		_, section, serviceName, serviceID, err := ParseKey(string(recv.Kv.Key))
		defer func() {
			for _, cb := range c.watchCallbacks {
				if cb != nil {
					cb(err, section, serviceName, serviceID, recv)
				}
			}
		}()

		if err != nil {
			return
		}

		if serviceName == GlobalConfigName {
			if c.globalConfigReflectType == nil {
				err = fmt.Errorf("global config not register")
				return
			}

			switch section {
			case SectionConfig:
				switch recv.Type {
				case mvccpb.PUT:
					iGlobalConfig := reflect.New(c.globalConfigReflectType).Interface()
					if err = json.Unmarshal(recv.Kv.Value, iGlobalConfig); err != nil {
						err = fmt.Errorf("unmarshal cluster global config fail, content=[%s], %v", string(recv.Kv.Value), err)
						return
					}

					globalConfig := iGlobalConfig.(nexport.IClusterGlobalConfig)
					if err = globalConfig.TidyAndCheck(); err != nil {
						err = fmt.Errorf("tidy and check cluster global config fail, %w", err)
						return
					}

					c.globalConfig = globalConfig

				case mvccpb.DELETE:
					if c.globalConfig == nil {
						err = fmt.Errorf("cluster global config not exist")
						return
					}
					c.globalConfig = nil
				}

			default:
				err = fmt.Errorf("unsupported cluster global config section[%s]", section)
			}
			return
		}

		sg := c.getServiceGroup(serviceName)
		if sg == nil {
			err = fmt.Errorf("get service group [%s] fail", serviceName)
			return
		}

		switch section {
		case SectionConfig:
			switch recv.Type {
			case mvccpb.PUT:
				if serviceID > 0 {
					if sg.ConfigReflectType == nil {
						err = fmt.Errorf("service[%s] config not register", serviceName)
						return
					}

					iConfig := reflect.New(sg.ConfigReflectType).Interface()
					if err = json.Unmarshal(recv.Kv.Value, iConfig); err != nil {
						err = fmt.Errorf("unmarshal service[%s] config json, content=[%s] fail, %v", serviceName, string(recv.Kv.Value), err)
						return
					}

					config := iConfig.(nexport.IServiceConfig)
					if config.GetID() != serviceID {
						err = fmt.Errorf("service[%s] config id not match, [%d] != [%d]", serviceName, config.GetID(), serviceID)
						return
					}
					if err = config.TidyAndCheck(); err != nil {
						err = fmt.Errorf("tidy and check %s[%d] config fail, %w", serviceName, serviceID, err)
						return
					}

					s := sg.GetInfoByID(serviceID)
					if s == nil {
						s = nservice.NewInfo(config, serviceID)
						sg.AddInfo(serviceID, s)
					} else {
						s.Config = config
					}
				} else {
					if sg.GlobalConfigReflectType == nil {
						err = fmt.Errorf("service[%s] global config not register", serviceName)
						return
					}

					iGlobalConfig := reflect.New(sg.GlobalConfigReflectType).Interface()
					if err = json.Unmarshal(recv.Kv.Value, iGlobalConfig); err != nil {
						err = fmt.Errorf("unmarshal service[%s] global config json, content=[%s] fail, %v", serviceName, string(recv.Kv.Value), err)
						return
					}

					globalConfig := iGlobalConfig.(nexport.IServiceGlobalConfig)
					if err = globalConfig.TidyAndCheck(); err != nil {
						err = fmt.Errorf("tidy and check %s[%d] global config fail, %w", serviceName, serviceID, err)
						return
					}

					sg.GlobalConfig = globalConfig
				}

			case mvccpb.DELETE:
				sg.RemoveInfo(serviceID)
			}

		case SectionInstance:
			s := sg.GetInfoByID(serviceID)
			if s == nil {
				err = fmt.Errorf("get %s[%d] info fail", serviceName, serviceID)
				return
			}

			switch recv.Type {
			case mvccpb.PUT:
				instance := nservice.NewInstance()
				if err = json.Unmarshal(recv.Kv.Value, instance); err != nil {
					err = fmt.Errorf("unmarshal %s[%d] instance fail, content=[%s] fail, %v",
						serviceName, serviceID, string(recv.Kv.Value), err)
					return
				}
				// 需要避免添加一个空的instance
				if instance.IsEmpty() {
					err = fmt.Errorf("can not put a empty %s[%d] instance", serviceName, serviceID)
					return
				}
				s.Instance = instance

			case mvccpb.DELETE:
				if s.Instance.IsEmpty() {
					err = fmt.Errorf("already delete %s[%d] instance", serviceName, serviceID)
					return
				}
				s.Instance = nil
			}

		case SectionPayload:
			s := sg.GetInfoByID(serviceID)
			if s == nil {
				err = fmt.Errorf("get %s[%d] info fail", serviceName, serviceID)
				return
			}

			switch recv.Type {
			case mvccpb.PUT:
				if s.Instance.IsEmpty() {
					err = fmt.Errorf("can not put %s[%d] payload, instance is empty", serviceName, serviceID)
					return
				}
				payload, e := strconv.Atoi(string(recv.Kv.Value))
				if e != nil {
					err = fmt.Errorf("put %s[%d] payload fail, %v", serviceName, serviceID, e)
					return
				}
				s.Payload = payload

			case mvccpb.DELETE:
				if s.Payload == nservice.NilPayload {
					err = fmt.Errorf("already delete %s[%d] payload", serviceName, serviceID)
					return
				}
				s.Payload = nservice.NilPayload
			}

		default:
			err = fmt.Errorf("unsupported section[%s]", section)
		}

	case *requestGlobalConfig:
		send := &responseGlobalConfig{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		if c.globalConfig != nil {
			send.globalConfig = c.globalConfig.Clone()
		}

	case *requestServiceGlobalConfig:
		send := &responseServiceGlobalConfig{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			if sg.GlobalConfig != nil {
				send.serviceGlobalConfig = sg.GlobalConfig.Clone()
			}
		}

	case *requestServiceConfig:
		send := &responseServiceConfig{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			info := sg.GetInfoByID(recv.id)
			if info.IsEmpty() {
				send.err = ErrServiceInfoNotExist
			} else {
				send.serviceConfig = info.Config.Clone()
			}
		}

	case *requestServiceInfo:
		send := &responseServiceInfo{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceInfoNotExist
		} else {
			info := sg.GetInfoByID(recv.id)
			if info.IsEmpty() {
				send.err = ErrServiceInfoNotExist
			} else {
				send.info = info.Clone()
			}
		}

	case *requestServiceInstance:
		send := &responseServiceInstance{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			info := sg.GetInfoByID(recv.id)
			if info.IsEmpty() {
				send.err = ErrServiceInfoNotExist
			} else {
				send.instance = info.Instance.Clone()
			}
		}

	case *requestServicePayload:
		send := &responseServicePayload{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			info := sg.GetInfoByID(recv.id)
			if info.IsEmpty() {
				send.err = ErrServiceInfoNotExist
			} else {
				send.payload = info.Payload
			}
		}

	case *requestPutServiceInstance:
		send := &responsePutServiceInstance{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			info := sg.GetInfoByID(recv.id)
			if info.IsEmpty() {
				send.err = ErrServiceInfoNotExist
			} else {
				instance := recv.instance.Clone()

				key := c.GenInstanceKey(recv.name, recv.id)
				value, err := json.Marshal(instance)
				if err != nil {
					send.err = fmt.Errorf("marshal instance fail, %w", err)
					return
				}

				err = c.etcdClient.PutWithLease(key, string(value), c.LeaseID())
				if err != nil {
					send.err = fmt.Errorf("put etcd fail, %w", err)
					return
				}

				info.Instance = instance

				return
			}
		}

	case *requestTryLockServicePosition:
		send := &responseTryLockServicePosition{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			info := sg.GetInfoByID(recv.id)
			if info.IsEmpty() {
				send.err = ErrServiceInfoNotExist
			} else {
				if info.Instance != nil {
					send.err = ErrServiceInstanceAlreadyExist
					return
				}

				instance := recv.instance.Clone()

				key := c.GenInstanceKey(recv.name, recv.id)
				value, err := json.Marshal(instance)
				if err != nil {
					send.err = fmt.Errorf("marshal instance fail, %w", err)
					return
				}

				err = c.etcdClient.PutWithLease(key, string(value), c.LeaseID())
				if err != nil {
					send.err = fmt.Errorf("put instance to etcd fail, %w", err)
					return
				}

				// todo 目前还未做负载均衡，写死0
				key = c.GenPayloadKey(recv.name, recv.id)
				err = c.etcdClient.PutWithLease(key, "0", c.LeaseID())
				if err != nil {
					send.err = fmt.Errorf("put payload to etcd fail, %w", err)
				}

				info.Instance = instance
			}
		}

	case *requestPutServicePayload:
		send := &responsePutServicePayload{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			info := sg.GetInfoByID(recv.id)
			if info.IsEmpty() {
				send.err = ErrServiceInfoNotExist
			} else {
				key := c.GenPayloadKey(recv.name, recv.id)
				value := strconv.Itoa(int(recv.payload))
				err := c.etcdClient.PutWithLease(key, value, c.LeaseID())
				if err != nil {
					send.err = fmt.Errorf("put payload to etcd fail, %w", err)
					return
				}

				info.Payload = recv.payload
			}
		}

	case *requestServiceGroupAllInfo:
		send := &responseServiceGroupAllInfo{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			sg.EachInfo(func(id int, info *nservice.Info) bool {
				send.infos = append(send.infos, info.Clone())
				return true
			})
		}

	case *requestServiceInfoNum:
		send := &responseServiceInfoNum{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			send.num = sg.Infos.Size()
		}

	case *requestServiceInstanceNum:
		send := &responseServiceInstanceNum{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceGroupNotExist
		} else {
			sg.EachInstance(func(s string, i int, instance *nservice.Instance) bool {
				if !instance.IsEmpty() {
					send.num += 1
				}
				return true
			})
		}

	case *requestRoundRobinPickService:
		send := &responseRoundRobinPickService{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()

		sg := c.getServiceGroup(recv.name)
		if sg == nil {
			send.err = ErrServiceInfoNotExist
		} else {
			info := sg.RoundRobinPickInstance()
			if !info.IsEmpty() {
				send.info = info.Clone()
			}
		}

	case *requestAddWatchCallback:
		send := &responseAddWatchCallback{}
		defer func() {
			if sender != nil {
				ctx.Send(sender, send)
			}
		}()
		if recv.cb == nil {
			send.err = ErrCallbackIsNil
			return
		}
		c.watchCallbacks = append(c.watchCallbacks, recv.cb)

	default:
		c.Error("recv unsupported msg [%#v] from [%v]", recv, sender)
	}
}

func (c *Cluster) onStopping(ctx actor.Context) {
	c.etcdClient.Close()
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) EtcdConfig() netcd.Config {
	return c.etcdConfig
}

func (c *Cluster) LeaseID() clientv3.LeaseID {
	return c.leaseID
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (c *Cluster) GetGlobalConfig() (nexport.IClusterGlobalConfig, error) {
	recv, err := requestResponseGlobalConfig(c.PID(), &requestGlobalConfig{})
	if err != nil {
		return nil, err
	}
	return recv.globalConfig, nil
}

func (c *Cluster) GetServiceGlobalConfig(serviceName string) (nexport.IServiceGlobalConfig, error) {
	send := &requestServiceGlobalConfig{}
	send.name = serviceName
	recv, err := requestResponseServiceGlobalConfig(c.PID(), send)
	if err != nil {
		return nil, err
	}
	return recv.serviceGlobalConfig, recv.err
}

func (c *Cluster) GetServiceConfig(serviceName string, serviceID int) (nexport.IServiceConfig, error) {
	send := &requestServiceConfig{}
	send.name = serviceName
	send.id = serviceID
	recv, err := requestResponseServiceConfig(c.PID(), send)
	if err != nil {
		return nil, err
	}
	return recv.serviceConfig, recv.err
}

func (c *Cluster) GetServiceInfo(serviceName string, serviceID nexport.ServiceID) (*nservice.Info, error) {
	send := &requestServiceInfo{}
	send.name = serviceName
	send.id = serviceID
	recv, err := requestResponseServiceInfo(c.PID(), send)
	if err != nil {
		return nil, err
	}
	return recv.info, recv.err
}

func (c *Cluster) GetServiceInstance(serviceName string, serviceID nexport.ServiceID) (*nservice.Instance, error) {
	send := &requestServiceInstance{}
	send.name = serviceName
	send.id = serviceID
	recv, err := requestResponseServiceInstance(c.PID(), send)
	if err != nil {
		return nil, err
	}
	return recv.instance, recv.err
}

func (c *Cluster) PutServiceInstance(serviceName string, serviceID nexport.ServiceID, instance *nservice.Instance) error {
	send := &requestPutServiceInstance{}
	send.name = serviceName
	send.id = serviceID
	send.instance = instance.Clone()
	recv, err := requestResponsePutServiceInstance(c.PID(), send)
	if err != nil {
		return err
	}
	return recv.err
}

func (c *Cluster) TryLockServicePosition(serviceName string, serviceID nexport.ServiceID, instance *nservice.Instance) error {
	send := &requestTryLockServicePosition{}
	send.name = serviceName
	send.id = serviceID
	send.instance = instance.Clone()
	recv, err := requestResponseTryLockServicePosition(c.PID(), send)
	if err != nil {
		return err
	}
	return recv.err
}

func (c *Cluster) RoundRobinPickService(serviceName string) (info *nservice.Info, err error) {
	send := &requestRoundRobinPickService{}
	send.name = serviceName
	recv, err := requestResponseRoundRobinPickService(c.PID(), send)
	if err != nil {
		return nil, err
	}
	return recv.info, recv.err
}

func (c *Cluster) GetServicePayload(serviceName string, serviceID nexport.ServiceID) (nexport.Payload, error) {
	send := &requestServicePayload{}
	send.name = serviceName
	send.id = serviceID
	recv, err := requestResponseServicePayload(c.PID(), send)
	if err != nil {
		return nservice.NilPayload, err
	}
	return recv.payload, recv.err
}

func (c *Cluster) PutServicePayload(serviceName string, serviceID nexport.ServiceID, payload nexport.Payload) error {
	send := &requestPutServicePayload{}
	send.name = serviceName
	send.id = serviceID
	send.payload = payload
	recv, err := requestResponsePutServicePayload(c.PID(), send)
	if err != nil {
		return err
	}
	return recv.err
}

func (c *Cluster) GetServiceGroupAllInfo(serviceName string) ([]*nservice.Info, error) {
	send := &requestServiceGroupAllInfo{}
	send.name = serviceName
	recv, err := requestResponseServiceGroupAllInfo(c.PID(), send)
	if err != nil {
		return nil, err
	}
	return recv.infos, err
}

func (c *Cluster) GetServiceInfoNum(serviceName string) (num int, err error) {
	send := &requestServiceInfoNum{}
	send.name = serviceName
	recv, err := requestResponseServiceInfoNum(c.PID(), send)
	if err != nil {
		return
	}
	return recv.num, err
}

func (c *Cluster) GetServiceInstanceNum(serviceName string) (num int, err error) {
	send := &requestServiceInstanceNum{}
	send.name = serviceName
	recv, err := requestResponseServiceInstanceNum(c.PID(), send)
	if err != nil {
		return
	}
	return recv.num, err
}

func (c *Cluster) AddWatchCallback(cb WatchCallback) (err error) {
	send := &requestAddWatchCallback{}
	send.cb = cb
	recv, err := requestResponseAddWatchCallback(c.PID(), send)
	if err != nil {
		return err
	}
	return recv.err
}
